package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"log"
	"strconv"
	"time"
)

var (
	kafkaAddrs = []string{"127.0.0.1:9092"}
)

const (
	RealTestTopic = "RealTestTopic"
	RealTestGroup = "RealTestGroup"

	RealTopic = "real_topic"
	RealGroup = "real_group"

	DelayTopic1 = "delay_topic1"
)

func main() {
	producerConfig := sarama.NewConfig()
	// 生产者配置～～
	producerConfig.Producer.Return.Successes = true
	producerConfig.Producer.RequiredAcks = sarama.WaitForAll
	// Notice Partitioner 消息分区的算法～
	producerConfig.Producer.Partitioner = sarama.NewHashPartitioner

	producer, err := sarama.NewSyncProducer(
		kafkaAddrs,
		producerConfig,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer producer.Close()

	userIDs := []string{"user1", "user1", "user1", "user1", "user2", "user3", "user4"}

	for i, userId := range userIDs {
		// 要发送的消息
		msg := &sarama.ProducerMessage{
			Topic: RealTopic,
			// Notice 在Key中设置: 根据userId将消息发送到指定分区～保证消息顺序消费～
			// 使用userId作为Key，确保相同userId的消息被发送到同一个分区
			Key:   sarama.StringEncoder(userId),
			Value: sarama.StringEncoder(userId + "-" + strconv.Itoa(i)),

			// Notice 在delayService设置delay，TimeStamp要传当前时间表示消息发出的时间～～～
			// Notice 然后delayService比较他那边的时间与这个这个Timestamp，过了 delay时间 就发送消息～
			Timestamp: time.Now(),
		}
		partition, offset, err := producer.SendMessage(msg)
		if err != nil {
			log.Println(err)
		}
		fmt.Printf("userId: %v, partition: %v, offset: %v \n", userId, partition, offset)
		time.Sleep(time.Millisecond)
	}

	time.Sleep(time.Second * 1)
}
